/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.cassandra.db;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import com.googlecode.concurrenttrees.common.Iterables;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.UpdateBuilder;
import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.cql3.statements.schema.IndexTarget;
import org.apache.cassandra.db.ColumnFamilyStore.FlushReason;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.memtable.AbstractMemtable;
import org.apache.cassandra.db.memtable.Memtable;
import org.apache.cassandra.db.partitions.FilteredPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableReadsListener;
import org.apache.cassandra.io.sstable.ScrubTest;
import org.apache.cassandra.io.sstable.format.SSTableFormat.Components;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.File;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.ClearableHistogram;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Indexes;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SchemaTestUtil;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
import org.apache.cassandra.service.snapshot.SnapshotOptions;
import org.apache.cassandra.service.snapshot.SnapshotManager;
import org.apache.cassandra.service.snapshot.SnapshotManifest;
import org.apache.cassandra.service.snapshot.SnapshotType;
import org.apache.cassandra.service.snapshot.TableSnapshot;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.utils.concurrent.OpOrder.Barrier;
import org.apache.cassandra.utils.concurrent.OpOrder.Group;

import static org.apache.cassandra.cql3.statements.schema.IndexTarget.Type.VALUES;
import static org.apache.cassandra.schema.IndexMetadata.Kind.COMPOSITES;
import static org.apache.cassandra.schema.IndexMetadata.Kind.CUSTOM;
import static org.apache.cassandra.schema.IndexMetadata.fromIndexTargets;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

public class ColumnFamilyStoreTest
{
    public static final String KEYSPACE1 = "ColumnFamilyStoreTest1";
    public static final String KEYSPACE2 = "ColumnFamilyStoreTest2";
    public static final String KEYSPACE3 = "ColumnFamilyStoreTest3";
    public static final String CF_STANDARD1 = "Standard1";
    public static final String CF_STANDARD2 = "Standard2";
    public static final String CF_INDEX1 = "Indexed1";
    public static final String CF_SPEC_RETRY1 = "SpeculativeRetryTest1";

    @BeforeClass
    public static void defineSchema() throws ConfigurationException
    {
        SchemaLoader.prepareServer();
        SchemaLoader.createKeyspace(KEYSPACE1,
                                    KeyspaceParams.simple(1),
                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2),
                                    SchemaLoader.keysIndexCFMD(KEYSPACE1, CF_INDEX1, true));
        SchemaLoader.createKeyspace(KEYSPACE2,
                                    KeyspaceParams.simple(1),
                                    SchemaLoader.standardCFMD(KEYSPACE2, CF_STANDARD1));
        SchemaLoader.createKeyspace(KEYSPACE3,
                                    KeyspaceParams.simple(1),
                                    SchemaLoader.standardCFMD(KEYSPACE3, CF_SPEC_RETRY1)
                                                .speculativeRetry(SpeculativeRetryPolicy.fromString("50PERCENTILE"))
                                                .additionalWritePolicy(SpeculativeRetryPolicy.fromString("75PERCENTILE")));
    }

    @Before
    public void truncateCFS()
    {
        Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).truncateBlockingWithoutSnapshot();
        Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2).truncateBlockingWithoutSnapshot();
        Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_INDEX1).truncateBlockingWithoutSnapshot();
        Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1).truncateBlockingWithoutSnapshot();
        Keyspace.open(KEYSPACE3).getColumnFamilyStore(CF_SPEC_RETRY1).truncateBlockingWithoutSnapshot();

        SnapshotManager.instance.clearAllSnapshots(KEYSPACE1, CF_STANDARD1);
        SnapshotManager.instance.clearAllSnapshots(KEYSPACE1, CF_STANDARD2);
        SnapshotManager.instance.clearAllSnapshots(KEYSPACE1, CF_INDEX1);
        SnapshotManager.instance.clearAllSnapshots(KEYSPACE2, CF_STANDARD1);
        SnapshotManager.instance.clearAllSnapshots(KEYSPACE3, CF_SPEC_RETRY1);
    }

    @After
    public void afterTest()
    {
        SnapshotManager.instance.clearAllSnapshots();
    }

    @Test
    public void testMemtableTimestamp()
    {
        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
        assertEquals(Memtable.NO_MIN_TIMESTAMP, fakeMemTableWithMinTS(cfs, EncodingStats.NO_STATS.minTimestamp).getMinTimestamp());
    }

    @Test
    // create two sstables, and verify that we only deserialize data from the most recent one
    public void testTimeSortedQuery()
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);

        new RowUpdateBuilder(cfs.metadata(), 0, "key1")
        .clustering("Column1")
        .add("val", "asdf")
        .build()
        .applyUnsafe();
        Util.flush(cfs);

        new RowUpdateBuilder(cfs.metadata(), 1, "key1")
        .clustering("Column1")
        .add("val", "asdf")
        .build()
        .applyUnsafe();
        Util.flush(cfs);

        ((ClearableHistogram) cfs.metric.sstablesPerReadHistogram.cf).clear(); // resets counts
        Util.getAll(Util.cmd(cfs, "key1").includeRow("c1").build());
        assertEquals(1, cfs.metric.sstablesPerReadHistogram.cf.getCount());
    }

    @Test
    public void testGetColumnWithWrongBF()
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
        keyspace.getColumnFamilyStores().forEach(ColumnFamilyStore::truncateBlockingWithoutSnapshot);

        List<Mutation> rms = new LinkedList<>();
        rms.add(new RowUpdateBuilder(cfs.metadata(), 0, "key1")
                .clustering("Column1")
                .add("val", "asdf")
                .build());

        Util.writeColumnFamily(rms);

        List<SSTableReader> ssTables = keyspace.getAllSSTables(SSTableSet.LIVE);
        assertEquals(1, ssTables.size());
        Util.disableBloomFilter(cfs);
        Util.assertEmpty(Util.cmd(cfs, "key2").build());
    }

    @Test
    public void testEmptyRow() throws Exception
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        final ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD2);

        RowUpdateBuilder.deleteRow(cfs.metadata(), FBUtilities.timestampMicros(), "key1", "Column1").applyUnsafe();

        Runnable r = new WrappedRunnable()
        {
            public void runMayThrow() throws IOException
            {
                Row toCheck = Util.getOnlyRowUnfiltered(Util.cmd(cfs, "key1").build());
                Iterator<Cell<?>> iter = toCheck.cells().iterator();
                assert (Iterators.size(iter) == 0);
            }
        };

        reTest(cfs, r);
    }

    @Test
    public void testDeleteStandardRowSticksAfterFlush()
    {
        // test to make sure flushing after a delete doesn't resurrect delted cols.
        String keyspaceName = KEYSPACE1;
        String cfName = CF_STANDARD1;
        Keyspace keyspace = Keyspace.open(keyspaceName);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);

        ByteBuffer col = ByteBufferUtil.bytes("val");
        ByteBuffer val = ByteBufferUtil.bytes("val1");

        // insert
        Mutation.SimpleBuilder builder = Mutation.simpleBuilder(keyspaceName, cfs.metadata().partitioner.decorateKey(ByteBufferUtil.bytes("val2")));
        builder.update(cfName).row("Column1").add("val", "val1").build();

        new RowUpdateBuilder(cfs.metadata(), 0, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
        new RowUpdateBuilder(cfs.metadata(), 0, "key2").clustering("Column1").add("val", "val1").build().applyUnsafe();
        assertRangeCount(cfs, col, val, 2);

        // flush.
        Util.flush(cfs);

        // insert, don't flush
        new RowUpdateBuilder(cfs.metadata(), 1, "key3").clustering("Column1").add("val", "val1").build().applyUnsafe();
        new RowUpdateBuilder(cfs.metadata(), 1, "key4").clustering("Column1").add("val", "val1").build().applyUnsafe();
        assertRangeCount(cfs, col, val, 4);

        // delete (from sstable and memtable)
        RowUpdateBuilder.deleteRow(cfs.metadata(), 5, "key1", "Column1").applyUnsafe();
        RowUpdateBuilder.deleteRow(cfs.metadata(), 5, "key3", "Column1").applyUnsafe();

        // verify delete
        assertRangeCount(cfs, col, val, 2);

        // flush
        Util.flush(cfs);

        // re-verify delete. // first breakage is right here because of CASSANDRA-1837.
        assertRangeCount(cfs, col, val, 2);

        // simulate a 'late' insertion that gets put in after the deletion. should get inserted, but fail on read.
        new RowUpdateBuilder(cfs.metadata(), 2, "key1").clustering("Column1").add("val", "val1").build().applyUnsafe();
        new RowUpdateBuilder(cfs.metadata(), 2, "key3").clustering("Column1").add("val", "val1").build().applyUnsafe();

        // should still be nothing there because we deleted this row. 2nd breakage, but was undetected because of 1837.
        assertRangeCount(cfs, col, val, 2);

        // make sure that new writes are recognized.
        new RowUpdateBuilder(cfs.metadata(), 10, "key5").clustering("Column1").add("val", "val1").build().applyUnsafe();
        new RowUpdateBuilder(cfs.metadata(), 10, "key6").clustering("Column1").add("val", "val1").build().applyUnsafe();
        assertRangeCount(cfs, col, val, 4);

        // and it remains so after flush. (this wasn't failing before, but it's good to check.)
        Util.flush(cfs);
        assertRangeCount(cfs, col, val, 4);
    }

    @Test
    public void testClearEphemeralSnapshots()
    {
        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_INDEX1);

        int numRows = 1000;
        long[] colValues = new long[numRows * 2]; // each row has two columns
        for (int i = 0; i < colValues.length; i += 2)
        {
            colValues[i] = (i % 4 == 0 ? 1L : 2L); // index column
            colValues[i + 1] = 3L; //other column
        }
        ScrubTest.fillIndexCF(cfs, false, colValues);

        SnapshotManager.instance.takeSnapshot(SnapshotOptions.systemSnapshot("nonEphemeralSnapshot", SnapshotType.MISC, cfs.getKeyspaceTableName()).build());
        SnapshotManager.instance.takeSnapshot(SnapshotOptions.systemSnapshot("ephemeralSnapshot", SnapshotType.REPAIR, cfs.getKeyspaceTableName()).ephemeral().build());

        assertTrue(SnapshotManager.instance.exists(p -> p.getTag().endsWith("ephemeralSnapshot")));
        assertTrue(SnapshotManager.instance.exists(p -> p.getTag().endsWith("nonEphemeralSnapshot")));

        SnapshotManager.instance.clearEphemeralSnapshots();

        List<TableSnapshot> snapshots = SnapshotManager.instance.getSnapshots(p -> true);
        assertEquals(1, snapshots.size());


        assertTrue(snapshots.get(0).getTag().endsWith("nonEphemeralSnapshot"));
    }

    @Test
    public void testSnapshotSize() throws IOException
    {
        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);

        // Add row
        new RowUpdateBuilder(cfs.metadata(), 0, "key1")
        .clustering("Column1")
        .add("val", "asdf")
        .build()
        .applyUnsafe();
        Util.flush(cfs);

        // snapshot
        SnapshotManager.instance.takeSnapshot("basic", cfs.getKeyspaceTableName());

        // check snapshot was created
        Map<String, TableSnapshot> snapshotDetails = Util.listSnapshots(cfs);
        assertThat(snapshotDetails).hasSize(1);
        assertThat(snapshotDetails).containsKey("basic");

        // check that sizeOnDisk > trueSize = 0
        TableSnapshot details = snapshotDetails.get("basic");
        assertThat(details.computeSizeOnDiskBytes()).isGreaterThan(details.computeTrueSizeBytes());
        assertThat(details.computeTrueSizeBytes()).isEqualTo(getSnapshotManifestAndSchemaFileSizes(details));

        // compact base table to make trueSize > 0
        cfs.forceMajorCompaction();
        LifecycleTransaction.waitForDeletions();

        // sizeOnDisk > trueSize because trueSize does not include manifest.json
        // Check that truesize now is > 0
        snapshotDetails = Util.listSnapshots(cfs);
        details = snapshotDetails.get("basic");
        assertThat(details.computeSizeOnDiskBytes()).isEqualTo(details.computeTrueSizeBytes());
    }

    @Test
    public void testTrueSnapshotSizeWithLegacyIndex() throws Throwable
    {
        testTrueSnapshotSizeInternal("snapshot_sizes_legacy", "true_snapshot_size_with_legacy_index", false);
    }

    @Test
    public void testTrueSnapshotSizeWithSaiIndex() throws Throwable
    {
        testTrueSnapshotSizeInternal("snapshot_sizes_sai","true_snapshot_size_with_sai_index", true);
    }

    private void testTrueSnapshotSizeInternal(String keyspace, String table, boolean forSai) throws Throwable
    {
        TableMetadata tableMetadata = SchemaLoader.standardCFMD(keyspace, table).partitioner(Murmur3Partitioner.instance).build();
        SchemaLoader.createKeyspace(keyspace, KeyspaceParams.simple(1), tableMetadata);

        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(table);

        new RowUpdateBuilder(cfs.metadata(), 0, "key1")
        .clustering("Column1")
        .add("val", "value1")
        .build()
        .applyUnsafe();
        Util.flush(cfs);

        assertThat(cfs.trueSnapshotsSize()).isZero();

        SnapshotManager.instance.takeSnapshot("snapshot0_without_index", cfs.getKeyspaceTableName());

        long sizeOfOneSnapshot = cfs.trueSnapshotsSize();
        // test that true snapshot size computed from manager is same as true snapshot size from cfs with one snapshot
        long trueSnapshotSizeViaManagerWithoutIndex = SnapshotManager.instance.getTrueSnapshotsSize(cfs.getKeyspaceName(), cfs.getTableName(), "snapshot0_without_index");
        Assert.assertEquals(sizeOfOneSnapshot, trueSnapshotSizeViaManagerWithoutIndex);
        Map<String, TableSnapshot> listedSnapshots = Util.listSnapshots(cfs);
        assertThat(sizeOfOneSnapshot).isPositive();
        assertThat(listedSnapshots.size()).isEqualTo(1);
        assertThat(listedSnapshots.get("snapshot0_without_index")).isNotNull();
        long withoutIndexSize = listedSnapshots.get("snapshot0_without_index").computeSizeOnDiskBytes();
        long withoutIndexTrueSize = listedSnapshots.get("snapshot0_without_index").computeTrueSizeBytes();

        assertThat(withoutIndexSize).isGreaterThan(withoutIndexTrueSize);
        assertEquals(sizeOfOneSnapshot, withoutIndexTrueSize);

        // add index, trueSnapshotSize should reflect that
        ColumnIdentifier col = ColumnIdentifier.getInterned("val", true);

        IndexMetadata indexMetadata;
        if (forSai)
            indexMetadata = fromIndexTargets(List.of(new IndexTarget(col, VALUES)), "idx", CUSTOM, Map.of("class_name", "sai"));
        else
            indexMetadata = fromIndexTargets(List.of(new IndexTarget(col, VALUES)), "idx", COMPOSITES, Map.of());

        TableMetadata tableMetadataWithIndex = SchemaLoader.standardCFMD(keyspace, table)
                                                           .partitioner(Murmur3Partitioner.instance)
                                                           .id(tableMetadata.id)
                                                           .indexes(Indexes.of(indexMetadata))
                                                           .build();

        SchemaTestUtil.announceTableUpdate(tableMetadataWithIndex);

        rebuildIndices(cfs);

        SnapshotManager.instance.takeSnapshot("snapshot1_with_index", new HashMap<>(), cfs.getKeyspaceTableName());

        long sizeOfTwoSnapshots = cfs.trueSnapshotsSize();
        long trueSnapshotSizeViaManagerWithIndex = SnapshotManager.instance.getTrueSnapshotsSize(cfs.getKeyspaceName(), cfs.getTableName(), "snapshot1_with_index");

        assertThat(trueSnapshotSizeViaManagerWithIndex).isGreaterThan(trueSnapshotSizeViaManagerWithoutIndex);

        assertThat(sizeOfTwoSnapshots).isPositive();
        assertThat(sizeOfTwoSnapshots).isGreaterThan(sizeOfOneSnapshot);

        listedSnapshots = Util.listSnapshots(cfs);
        assertThat(listedSnapshots.size()).isEqualTo(2);
        assertThat(listedSnapshots.get("snapshot1_with_index")).isNotNull();
        assertThat(listedSnapshots.get("snapshot0_without_index")).isNotNull();

        long withIndexSize = listedSnapshots.get("snapshot1_with_index").computeSizeOnDiskBytes();
        long withIndexTrueSize = listedSnapshots.get("snapshot1_with_index").computeTrueSizeBytes();

        assertThat(withIndexSize).isGreaterThan(withIndexTrueSize);
        assertEquals(sizeOfTwoSnapshots, withIndexTrueSize + withoutIndexTrueSize);

        // taking another one is basically a copy of the previous
        SnapshotManager.instance.takeSnapshot("snapshot2_with_index", new HashMap<>(), cfs.getKeyspaceTableName());

        long sizeOfThreeSnapshots = cfs.trueSnapshotsSize();
        assertThat(sizeOfThreeSnapshots).isPositive();
        assertThat(sizeOfThreeSnapshots).isGreaterThan(sizeOfTwoSnapshots);

        listedSnapshots = Util.listSnapshots(cfs);
        long anotherWithIndexSize = listedSnapshots.get("snapshot2_with_index").computeSizeOnDiskBytes();
        long anotherWithIndexTrueSize = listedSnapshots.get("snapshot2_with_index").computeTrueSizeBytes();

        assertEquals(withIndexSize, anotherWithIndexSize);
        assertEquals(withIndexTrueSize, anotherWithIndexTrueSize);
    }

    private void rebuildIndices(ColumnFamilyStore cfs)
    {
        // be sure we build that index
        for (Index index : cfs.indexManager.listIndexes())
        {
            try
            {
                cfs.indexManager.buildIndex(index).get();
            }
            catch (Throwable t)
            {
                throw new RuntimeException("Unable to build index", t);
            }
        }
    }

    @Test
    public void testBackupAfterFlush()
    {
        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE2).getColumnFamilyStore(CF_STANDARD1);
        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key1")).clustering("Column1").add("val", "asdf").build().applyUnsafe();
        Util.flush(cfs);
        new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key2")).clustering("Column1").add("val", "asdf").build().applyUnsafe();
        Util.flush(cfs);

        for (SSTableReader liveSSTable : cfs.getLiveSSTables())
        {
            Descriptor existing = liveSSTable.descriptor;
            Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing),
                                             KEYSPACE2,
                                             CF_STANDARD1,
                                             liveSSTable.descriptor.id,
                                             liveSSTable.descriptor.version.format);
            for (Component c : liveSSTable.getComponents())
                assertTrue("Cannot find backed-up file:" + desc.fileFor(c), desc.fileFor(c).exists());
        }
    }

    @Test
    public void speculationThreshold()
    {
        // CF_SPEC_RETRY1 configured to use the 50th percentile for read and 75th percentile for write
        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE3).getColumnFamilyStore(CF_SPEC_RETRY1);

        cfs.sampleReadLatencyMicros = 123000;
        cfs.additionalWriteLatencyMicros = 234000;

        // test updating before any stats are present
        cfs.updateSpeculationThreshold();
        assertThat(cfs.sampleReadLatencyMicros).isEqualTo(123000);
        assertThat(cfs.additionalWriteLatencyMicros).isEqualTo(234000);

        // Seed the column family with some latency data.
        final int count = 10000;
        for (int millis = 0; millis < count; millis++)
        {
            cfs.metric.coordinatorReadLatency.update(millis, TimeUnit.MILLISECONDS);
            cfs.metric.coordinatorWriteLatency.update(millis, TimeUnit.MILLISECONDS);
        }
        // Sanity check the metrics - 50th percentile of linear 0-10000ms
        // remember, latencies are only an estimate - off by up to 20% by the 1.2 factor between buckets.
        assertThat(cfs.metric.coordinatorReadLatency.getCount()).isEqualTo(count);
        assertThat(cfs.metric.coordinatorReadLatency.getSnapshot().getValue(0.5))
        .isBetween((double) TimeUnit.MILLISECONDS.toMicros(5839),
                   (double) TimeUnit.MILLISECONDS.toMicros(5840));
        // Sanity check the metrics - 75th percentileof linear 0-10000ms
        assertThat(cfs.metric.coordinatorWriteLatency.getCount()).isEqualTo(count);
        assertThat(cfs.metric.coordinatorWriteLatency.getSnapshot().getValue(0.75))
        .isBetween((double) TimeUnit.MILLISECONDS.toMicros(8409),
                   (double) TimeUnit.MILLISECONDS.toMicros(8410));

        // CF_SPEC_RETRY1 configured to use the 50th percentile for speculation
        cfs.updateSpeculationThreshold();

        assertThat(cfs.sampleReadLatencyMicros).isBetween(TimeUnit.MILLISECONDS.toMicros(5839), TimeUnit.MILLISECONDS.toMicros(5840));
        assertThat(cfs.additionalWriteLatencyMicros).isBetween(TimeUnit.MILLISECONDS.toMicros(8409), TimeUnit.MILLISECONDS.toMicros(8410));
    }

    // TODO: Fix once we have working supercolumns in 8099
//    // CASSANDRA-3467.  the key here is that supercolumn and subcolumn comparators are different
//    @Test
//    public void testSliceByNamesCommandOnUUIDTypeSCF() throws Throwable
//    {
//        String keyspaceName = KEYSPACE1;
//        String cfName = CF_SUPER6;
//        ByteBuffer superColName = LexicalUUIDType.instance.fromString("a4ed3562-0e8e-4b41-bdfd-c45a2774682d");
//        Keyspace keyspace = Keyspace.open(keyspaceName);
//        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
//        DecoratedKey key = Util.dk("slice-get-uuid-type");
//
//        // Insert a row with one supercolumn and multiple subcolumns
//        putColsSuper(cfs, key, superColName, new BufferCell(cellname("a"), ByteBufferUtil.bytes("A"), 1),
//                                             new BufferCell(cellname("b"), ByteBufferUtil.bytes("B"), 1));
//
//        // Get the entire supercolumn like normal
//        ColumnFamily cfGet = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
//        assertEquals(ByteBufferUtil.bytes("A"), cfGet.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("a"))).value());
//        assertEquals(ByteBufferUtil.bytes("B"), cfGet.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b"))).value());
//
//        // Now do the SliceByNamesCommand on the supercolumn, passing both subcolumns in as columns to get
//        SortedSet<CellName> sliceColNames = new TreeSet<CellName>(cfs.metadata.comparator);
//        sliceColNames.add(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("a")));
//        sliceColNames.add(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b")));
//        SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.getKey(), cfName, System.currentTimeMillis(), new NamesQueryFilter(sliceColNames));
//        ColumnFamily cfSliced = cmd.getRow(keyspace).cf;
//
//        // Make sure the slice returns the same as the straight get
//        assertEquals(ByteBufferUtil.bytes("A"), cfSliced.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("a"))).value());
//        assertEquals(ByteBufferUtil.bytes("B"), cfSliced.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b"))).value());
//    }


    // TODO: Fix once SSTableSimpleWriter's back in
    // @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>


    // TODO: Fix once SSTableSimpleWriter's back in
//    @Test
//    public void testLoadNewSSTablesAvoidsOverwrites() throws Throwable
//    {
//        String ks = KEYSPACE1;
//        String cf = CF_STANDARD1;
//        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
//        SSTableDeletingTask.waitForDeletions();
//
//        final CFMetaData cfmeta = Schema.instance.getTableMetadataRef(ks, cf);
//        Directories dir = new Directories(cfs.metadata);
//
//        // clear old SSTables (probably left by CFS.clearUnsafe() calls in other tests)
//        for (Map.Entry<Descriptor, Set<Component>> entry : dir.sstableLister().list().entrySet())
//        {
//            for (Component component : entry.getValue())
//            {
//                FileUtils.delete(entry.getKey().filenameFor(component));
//            }
//        }
//
//        // sanity check
//        int existingSSTables = dir.sstableLister().list().keySet().size();
//        assert existingSSTables == 0 : String.format("%d SSTables unexpectedly exist", existingSSTables);
//
//        ByteBuffer key = bytes("key");
//
//        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
//                                                             cfmeta, StorageService.getPartitioner())
//        {
//            @Override
//            protected SSTableWriter getWriter()
//            {
//                // hack for reset generation
//                generation.set(0);
//                return super.getWriter();
//            }
//        };
//        writer.newRow(key);
//        writer.addColumn(bytes("col"), bytes("val"), 1);
//        writer.close();
//
//        writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
//                                         cfmeta, StorageService.getPartitioner());
//        writer.newRow(key);
//        writer.addColumn(bytes("col"), bytes("val"), 1);
//        writer.close();
//
//        Set<Integer> generations = new HashSet<>();
//        for (Descriptor descriptor : dir.sstableLister().list().keySet())
//            generations.add(descriptor.generation);
//
//        // we should have two generations: [1, 2]
//        assertEquals(2, generations.size());
//        assertTrue(generations.contains(1));
//        assertTrue(generations.contains(2));
//
//        assertEquals(0, cfs.getLiveSSTables().size());
//
//        // start the generation counter at 1 again (other tests have incremented it already)
//        cfs.resetFileIndexGenerator();
//
//        boolean incrementalBackupsEnabled = DatabaseDescriptor.isIncrementalBackupsEnabled();
//        try
//        {
//            // avoid duplicate hardlinks to incremental backups
//            DatabaseDescriptor.setIncrementalBackupsEnabled(false);
//            cfs.loadNewSSTables();
//        }
//        finally
//        {
//            DatabaseDescriptor.setIncrementalBackupsEnabled(incrementalBackupsEnabled);
//        }
//
//        assertEquals(2, cfs.getLiveSSTables().size());
//        generations = new HashSet<>();
//        for (Descriptor descriptor : dir.sstableLister().list().keySet())
//            generations.add(descriptor.generation);
//
//        // normally they would get renamed to generations 1 and 2, but since those filenames already exist,
//        // they get skipped and we end up with generations 3 and 4
//        assertEquals(2, generations.size());
//        assertTrue(generations.contains(3));
//        assertTrue(generations.contains(4));
//    }

    public void reTest(ColumnFamilyStore cfs, Runnable verify) throws Exception
    {
        verify.run();
        Util.flush(cfs);
        verify.run();
    }

    private void assertRangeCount(ColumnFamilyStore cfs, ByteBuffer col, ByteBuffer val, int count)
    {
        assertRangeCount(cfs, cfs.metadata().getColumn(col), val, count);
    }

    private void assertRangeCount(ColumnFamilyStore cfs, ColumnMetadata col, ByteBuffer val, int count)
    {

        int found = 0;
        if (count != 0)
        {
            for (FilteredPartition partition : Util.getAll(Util.cmd(cfs).filterOn(col.name.toString(), Operator.EQ, val).build()))
            {
                for (Row r : partition)
                {
                    if (r.getCell(col).buffer().equals(val))
                        ++found;
                }
            }
        }
        assertEquals(count, found);
    }

    @Test
    public void testSnapshotWithoutFlushWithSecondaryIndexes() throws Exception
    {
        Keyspace keyspace = Keyspace.open(KEYSPACE1);
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_INDEX1);
        cfs.truncateBlockingWithoutSnapshot();

        UpdateBuilder builder = UpdateBuilder.create(cfs.metadata.get(), "key")
                                             .newRow()
                                             .add("birthdate", 1L)
                                             .add("notbirthdate", 2L);
        new Mutation(builder.build()).applyUnsafe();
        Util.flush(cfs);

        String snapshotName = "newSnapshot";
        SnapshotManager.instance.takeSnapshot(snapshotName, Map.of(SnapshotOptions.SKIP_FLUSH, "true"), cfs.getKeyspaceTableName());

        File snapshotManifestFile = cfs.getDirectories().getSnapshotManifestFile(snapshotName);
        SnapshotManifest manifest = SnapshotManifest.deserializeFromJsonFile(snapshotManifestFile);

        // Keyspace1-Indexed1 and the corresponding index
        assertThat(manifest.getFiles()).hasSize(2);

        // Snapshot of the secondary index is stored in the subfolder with the same file name
        String baseTableFile = manifest.getFiles().get(0);
        String indexTableFile = manifest.getFiles().get(1);
        assertThat(baseTableFile).isNotEqualTo(indexTableFile);
        assertThat(Directories.isSecondaryIndexFolder(new File(indexTableFile).parent())).isTrue();

        Set<String> originalFiles = new HashSet<>();
        Iterables.toList(cfs.concatWithIndexes()).stream()
                 .flatMap(c -> c.getLiveSSTables().stream().map(t -> t.descriptor.fileFor(Components.DATA)))
                 .forEach(e -> originalFiles.add(e.toString()));
        assertThat(originalFiles.stream().anyMatch(f -> f.endsWith(indexTableFile))).isTrue();
        assertThat(originalFiles.stream().anyMatch(f -> f.endsWith(baseTableFile))).isTrue();
    }

    private void createSnapshotAndDelete(String ks, String table, boolean writeData)
    {
        ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(table);
        if (writeData)
        {
            writeData(cfs);
        }

        SnapshotManager.instance.takeSnapshot("basic", cfs.getKeyspaceTableName());

        TableSnapshot snapshot = SnapshotManager.instance.getSnapshot(cfs.metadata.keyspace, cfs.metadata.name, "basic").get();
        assertNotNull(snapshot);
        assertThat(snapshot.exists()).isTrue();
        assertThat(Util.listSnapshots(cfs).containsKey("basic")).isTrue();
        assertThat(Util.listSnapshots(cfs).get("basic")).isEqualTo(snapshot);

        snapshot.getDirectories().forEach(FileUtils::deleteRecursive);

        assertThat(snapshot.exists()).isFalse();
        assertFalse(Util.listSnapshots(cfs).containsKey("basic"));
    }

    private void writeData(ColumnFamilyStore cfs)
    {
        if (cfs.name.equals(CF_INDEX1))
        {
            new RowUpdateBuilder(cfs.metadata(), 2, "key").add("birthdate", 1L).add("notbirthdate", 2L).build().applyUnsafe();
            Util.flush(cfs);
        }
        else
        {
            new RowUpdateBuilder(cfs.metadata(), 2, "key").clustering("name").add("val", "2").build().applyUnsafe();
            Util.flush(cfs);
        }
    }

    @Test
    public void testSnapshotCreationAndDeleteEmptyTable()
    {
        createSnapshotAndDelete(KEYSPACE1, CF_INDEX1, false);
        createSnapshotAndDelete(KEYSPACE1, CF_STANDARD1, false);
        createSnapshotAndDelete(KEYSPACE1, CF_STANDARD2, false);
        createSnapshotAndDelete(KEYSPACE2, CF_STANDARD1, false);
        createSnapshotAndDelete(SchemaConstants.SYSTEM_KEYSPACE_NAME, SystemKeyspace.TRANSFERRED_RANGES_V2, false);
    }

    @Test
    public void testSnapshotCreationAndDeletePopulatedTable()
    {
        createSnapshotAndDelete(KEYSPACE1, CF_INDEX1, true);
        createSnapshotAndDelete(KEYSPACE1, CF_STANDARD1, true);
        createSnapshotAndDelete(KEYSPACE1, CF_STANDARD2, true);
        createSnapshotAndDelete(KEYSPACE2, CF_STANDARD1, true);
    }

    @Test
    public void testDataDirectoriesOfColumnFamily() throws Exception
    {
        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
        List<String> dataPaths = cfs.getDataPaths();
        Assert.assertFalse(dataPaths.isEmpty());

        Path path = Paths.get(dataPaths.get(0));

        String keyspace = path.getParent().getFileName().toString();
        String table = path.getFileName().toString().split("-")[0];

        assertEquals(cfs.getTableName(), table);
        assertEquals(KEYSPACE1, keyspace);
    }

    @Test
    public void testScrubDataDirectories() throws Throwable
    {
        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);

        ColumnFamilyStore.scrubDataDirectories(cfs.metadata());

        new RowUpdateBuilder(cfs.metadata(), 2, "key").clustering("name").add("val", "2").build().applyUnsafe();
        Util.flush(cfs);

        // Nuke the metadata and reload that sstable
        Collection<SSTableReader> ssTables = cfs.getLiveSSTables();
        assertEquals(1, ssTables.size());
        SSTableReader ssTable = ssTables.iterator().next();

        File dataFile = ssTable.descriptor.fileFor(Components.DATA);
        File tmpDataFile = ssTable.descriptor.tmpFileFor(Components.DATA);
        dataFile.tryMove(tmpDataFile);

        ssTable.selfRef().release();

        ColumnFamilyStore.scrubDataDirectories(cfs.metadata());

        List<File> ssTableFiles = new Directories(cfs.metadata()).sstableLister(Directories.OnTxnErr.THROW).listFiles();
        assertNotNull(ssTableFiles);
        assertEquals(0, ssTableFiles.size());
        cfs.clearUnsafe();
    }

    @VisibleForTesting
    public static long getSnapshotManifestAndSchemaFileSizes(TableSnapshot snapshot)
    {
        long schemaAndManifestFileSizes = 0;

        for (File schemaFile : snapshot.getSchemaFiles())
            schemaAndManifestFileSizes += schemaFile.length();

        for (File manifestFile : snapshot.getManifestFiles())
            schemaAndManifestFileSizes += manifestFile.length();

        return schemaAndManifestFileSizes;
    }

    private Memtable fakeMemTableWithMinTS(ColumnFamilyStore cfs, long minTS)
    {
        return new AbstractMemtable(cfs.metadata, minTS)
        {

            @Override
            public long put(PartitionUpdate update, UpdateTransaction indexer, Group opGroup, boolean assumeMissing)
            {
                return 0;
            }

            @Override
            public long partitionCount()
            {
                return 0;
            }

            @Override
            public long getLiveDataSize()
            {
                return 0;
            }

            @Override
            public void addMemoryUsageTo(MemoryUsage usage)
            {
            }

            @Override
            public void markExtraOnHeapUsed(long additionalSpace, Group opGroup)
            {
            }

            @Override
            public void markExtraOffHeapUsed(long additionalSpace, Group opGroup)
            {
            }

            @Override
            public FlushablePartitionSet<?> getFlushSet(PartitionPosition from, PartitionPosition to)
            {
                return null;
            }

            @Override
            public void switchOut(Barrier writeBarrier, AtomicReference<CommitLogPosition> commitLogUpperBound)
            {
            }

            @Override
            public void discard()
            {
            }

            @Override
            public boolean accepts(Group opGroup, CommitLogPosition commitLogPosition)
            {
                return false;
            }

            @Override
            public CommitLogPosition getApproximateCommitLogLowerBound()
            {
                return null;
            }

            @Override
            public CommitLogPosition getCommitLogLowerBound()
            {
                return null;
            }

            @Override
            public LastCommitLogPosition getFinalCommitLogUpperBound()
            {
                return null;
            }

            @Override
            public boolean mayContainDataBefore(CommitLogPosition position)
            {
                return false;
            }

            @Override
            public boolean isClean()
            {
                return false;
            }

            @Override
            public boolean shouldSwitch(FlushReason reason)
            {
                return false;
            }

            @Override
            public <T extends Consumer<TableMetadata>> T ensureFlushListener(Object key, Supplier<T> onDurablyFlushed)
            {
                return null;
            }

            @Override
            public void notifyFlushed()
            {
            }

            @Override
            public boolean shouldSwitch(FlushReason reason, TableMetadata latest)
            {
                return false;
            }

            @Override
            public void metadataUpdated()
            {
            }

            @Override
            public void localRangesUpdated()
            {
            }

            @Override
            public void performSnapshot(String snapshotName)
            {
            }

            @Override
            public UnfilteredRowIterator rowIterator(DecoratedKey key,
                                                     Slices slices,
                                                     ColumnFilter columnFilter,
                                                     boolean reversed,
                                                     SSTableReadsListener listener)
            {
                return null;
            }

            @Override
            public UnfilteredPartitionIterator
            partitionIterator(ColumnFilter columnFilter, DataRange dataRange, SSTableReadsListener listener)
            {
                return null;
            }
        };
    }
}
